agentmux_srv\backend\blockcontroller/
session_stats.rs1use std::sync::Arc;
18use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
19
20use crate::backend::obj::MetaMapType;
21use crate::backend::storage::wstore::WaveStore;
22
23pub const META_SESSION_START_TS_MS: &str = "session:start_ts_ms";
25pub const META_SESSION_LAST_ACTIVITY_MS: &str = "session:last_activity_ms";
26pub const META_SESSION_LINE_COUNT: &str = "session:line_count";
27pub const META_SESSION_TOKEN_ESTIMATE: &str = "session:token_estimate";
28
29const FLUSH_DEBOUNCE: Duration = Duration::from_secs(1);
31
32fn now_ms() -> i64 {
34 SystemTime::now()
35 .duration_since(UNIX_EPOCH)
36 .unwrap_or_default()
37 .as_millis() as i64
38}
39
40pub struct SessionStatsAccumulator {
45 block_id: String,
47 start_ts_ms: i64,
49 last_activity_ms: i64,
51 line_count: u64,
53 token_estimate: u64,
55 last_flush: Option<Instant>,
57}
58
59impl SessionStatsAccumulator {
60 pub fn new(block_id: String) -> Self {
62 Self {
63 block_id,
64 start_ts_ms: 0,
65 last_activity_ms: 0,
66 line_count: 0,
67 token_estimate: 0,
68 last_flush: None,
69 }
70 }
71
72 pub fn record_line(&mut self, line_len: usize, wstore: &Option<Arc<WaveStore>>) {
78 let ts = now_ms();
79 let is_first = self.start_ts_ms == 0;
80
81 if is_first {
82 self.start_ts_ms = ts;
83 }
84 self.last_activity_ms = ts;
85 self.line_count += 1;
86 self.token_estimate += (line_len / 4) as u64;
87
88 let should_flush = is_first || match self.last_flush {
90 None => true,
91 Some(last) => last.elapsed() >= FLUSH_DEBOUNCE,
92 };
93
94 if should_flush {
95 if let Some(ref store) = wstore {
96 self.flush(store);
97 }
98 }
99 }
100
101 fn flush(&mut self, wstore: &Arc<WaveStore>) {
105 let oref_str = format!("block:{}", self.block_id);
106 let mut meta_update = MetaMapType::new();
107
108 if self.start_ts_ms != 0 {
109 meta_update.insert(
110 META_SESSION_START_TS_MS.to_string(),
111 serde_json::json!(self.start_ts_ms),
112 );
113 }
114 meta_update.insert(
115 META_SESSION_LAST_ACTIVITY_MS.to_string(),
116 serde_json::json!(self.last_activity_ms),
117 );
118 meta_update.insert(
119 META_SESSION_LINE_COUNT.to_string(),
120 serde_json::json!(self.line_count),
121 );
122 meta_update.insert(
123 META_SESSION_TOKEN_ESTIMATE.to_string(),
124 serde_json::json!(self.token_estimate),
125 );
126
127 match crate::server::service::update_object_meta(wstore, &oref_str, &meta_update) {
128 Ok(()) => {
129 tracing::trace!(
130 block_id = %self.block_id,
131 line_count = self.line_count,
132 token_estimate = self.token_estimate,
133 "session stats flushed"
134 );
135 }
136 Err(e) => {
137 tracing::warn!(
138 block_id = %self.block_id,
139 error = %e,
140 "failed to flush session stats"
141 );
142 }
143 }
144
145 self.last_flush = Some(Instant::now());
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152
153 #[test]
154 fn test_accumulator_first_line_sets_start_ts() {
155 let mut acc = SessionStatsAccumulator::new("blk-1".to_string());
156 acc.record_line(100, &None);
158 assert_ne!(acc.start_ts_ms, 0);
159 assert_eq!(acc.line_count, 1);
160 assert_eq!(acc.token_estimate, 25); }
162
163 #[test]
164 fn test_accumulator_multiple_lines() {
165 let mut acc = SessionStatsAccumulator::new("blk-2".to_string());
166 acc.record_line(40, &None);
167 acc.record_line(80, &None);
168 acc.record_line(120, &None);
169 assert_eq!(acc.line_count, 3);
170 assert_eq!(acc.token_estimate, 60);
172 }
173
174 #[test]
175 fn test_accumulator_start_ts_not_reset_on_second_line() {
176 let mut acc = SessionStatsAccumulator::new("blk-3".to_string());
177 acc.record_line(10, &None);
178 let first_ts = acc.start_ts_ms;
179 acc.record_line(10, &None);
180 assert_eq!(acc.start_ts_ms, first_ts, "start_ts must not change after first line");
181 }
182
183 #[test]
184 fn test_debounce_constants() {
185 assert_eq!(FLUSH_DEBOUNCE, Duration::from_secs(1));
186 }
187}